/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.update;

import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.hamcrest.core.StringContains.containsString;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.PeerSync.MissedUpdatesRequest;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
import org.junit.Test;

@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
@SuppressWarnings("JdkObsolete")
public class PeerSyncTest extends BaseDistributedSearchTestCase {
  protected static int numVersions = 100; // number of versions to use when syncing
  protected static final String FROM_LEADER = DistribPhase.FROMLEADER.toString();
  protected static final ModifiableSolrParams seenLeader =
      params(DISTRIB_UPDATE_PARAM, FROM_LEADER);

  public PeerSyncTest() {
    stress = 0;

    // TODO: a better way to do this?
    configString = "solrconfig-tlog.xml";
    schemaString = "schema.xml";

    // validate that the schema was not changed to an unexpected state
    try {
      initCore(configString, schemaString);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    IndexSchema schema = h.getCore().getLatestSchema();
    assertTrue(
        schema.getFieldOrNull("_version_").hasDocValues()
            && !schema.getFieldOrNull("_version_").indexed()
            && !schema.getFieldOrNull("_version_").stored());
    assertTrue(
        !schema.getFieldOrNull("val_i_dvo").indexed()
            && !schema.getFieldOrNull("val_i_dvo").stored()
            && schema.getFieldOrNull("val_i_dvo").hasDocValues());
  }

  @Test
  @ShardsFixed(num = 3)
  public void test() throws Exception {
    Set<Integer> docsAdded = new LinkedHashSet<>();
    handle.clear();
    handle.put("timestamp", SKIPVAL);
    handle.put("score", SKIPVAL);
    handle.put("maxScore", SKIPVAL);

    SolrClient client0 = clients.get(0);
    SolrClient client1 = clients.get(1);

    int v = 0;
    add(client0, seenLeader, sdoc("id", "1", "_version_", ++v));

    // this fails because client0 has no context (i.e. no updates of its own to judge if applying
    // the updates
    // from client1 will bring it into sync with client1)
    assertSync(client1, numVersions, false, shardsArr[0]);

    // bring client1 back into sync with client0 by adding the doc
    add(client1, seenLeader, sdoc("id", "1", "_version_", v));

    // both have the same version list, so sync should now return true
    assertSync(client1, numVersions, true, shardsArr[0]);
    // TODO: test that updates weren't necessary

    client0.commit();
    client1.commit();
    queryAndCompare(params("q", "*:*"), client0, client1);

    add(client0, seenLeader, addRandFields(sdoc("id", "2", "_version_", ++v)));

    // now client1 has the context to sync
    assertSync(client1, numVersions, true, shardsArr[0]);

    client0.commit();
    client1.commit();
    queryAndCompare(params("q", "*:*"), client0, client1);

    add(client0, seenLeader, addRandFields(sdoc("id", "3", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "4", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "5", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "6", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "7", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "8", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "9", "_version_", ++v)));
    add(client0, seenLeader, addRandFields(sdoc("id", "10", "_version_", ++v)));
    for (int i = 0; i < 10; i++) docsAdded.add(i + 1);
    assertSync(client1, numVersions, true, shardsArr[0]);

    validateDocs(docsAdded, client0, client1);

    testOverlap(docsAdded, client0, client1, v);
    // test delete and deleteByQuery
    v = 1000;
    SolrInputDocument doc = sdoc("id", "1000", "_version_", ++v);
    add(client0, seenLeader, doc);
    add(client0, seenLeader, sdoc("id", "1001", "_version_", ++v));
    delQ(
        client0,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(- ++v)),
        "id:1001 OR id:1002");
    add(client0, seenLeader, sdoc("id", "1002", "_version_", ++v));
    del(
        client0,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(- ++v)),
        "1000");
    docsAdded.add(1002); // 1002 added

    assertSync(client1, numVersions, true, shardsArr[0]);
    validateDocs(docsAdded, client0, client1);

    // test that delete by query is returned even if not requested, and that it doesn't delete newer
    // stuff than it should
    v = 2000;
    SolrClient client = client0;
    add(client, seenLeader, sdoc("id", "2000", "_version_", ++v));
    add(client, seenLeader, sdoc("id", "2001", "_version_", ++v));
    delQ(
        client,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(- ++v)),
        "id:2001 OR id:2002");
    add(client, seenLeader, sdoc("id", "2002", "_version_", ++v));
    del(
        client,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(- ++v)),
        "2000");
    docsAdded.add(2002); // 2002 added

    v = 2000;
    client = client1;
    add(client, seenLeader, sdoc("id", "2000", "_version_", ++v));
    ++v; // pretend we missed the add of 2001.  peersync should retrieve it, but should also
    // retrieve any deleteByQuery objects after it
    // add(client, seenLeader, sdoc("id","2001","_version_",++v));
    delQ(
        client,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(- ++v)),
        "id:2001 OR id:2002");
    add(client, seenLeader, sdoc("id", "2002", "_version_", ++v));
    del(
        client,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", Long.toString(- ++v)),
        "2000");

    assertSync(client1, numVersions, true, shardsArr[0]);

    validateDocs(docsAdded, client0, client1);

    //
    // Test that handling reorders work when applying docs retrieved from peer
    //

    // this should cause us to retrieve the delete operation (but not the following add)
    // the reorder in application shouldn't affect anything
    add(client0, seenLeader, sdoc("id", "3000", "_version_", 3001));
    add(client1, seenLeader, sdoc("id", "3000", "_version_", 3001));
    del(client0, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "3000"), "3000");
    docsAdded.add(3000);

    // this should cause us to retrieve an add tha was previously deleted
    add(client0, seenLeader, sdoc("id", "3001", "_version_", 3003));
    del(client0, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "3001"), "3004");
    del(client1, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "3001"), "3004");

    // this should cause us to retrieve an older add that was overwritten
    add(client0, seenLeader, sdoc("id", "3002", "_version_", 3004));
    add(client0, seenLeader, sdoc("id", "3002", "_version_", 3005));
    add(client1, seenLeader, sdoc("id", "3002", "_version_", 3005));
    docsAdded.add(3001); // 3001 added
    docsAdded.add(3002); // 3002 added

    assertSync(client1, numVersions, true, shardsArr[0]);
    validateDocs(docsAdded, client0, client1);

    // now lets check fingerprinting causes appropriate fails
    v = 4000;
    add(client0, seenLeader, sdoc("id", Integer.toString(v), "_version_", v));
    docsAdded.add(4000);
    int toAdd = numVersions + 10;

    List<SolrInputDocument> additionalDocs = new ArrayList<>(toAdd);
    for (int i = v + 1; i < v + toAdd + 1; i++) {
      additionalDocs.add(sdoc("id", Integer.toString(i), "_version_", i));
      docsAdded.add(i);
    }
    add(client0, seenLeader, additionalDocs);
    add(client1, seenLeader, additionalDocs);

    // client0 now has an additional add beyond our window and the fingerprint should cause this to
    // fail
    assertSync(client1, numVersions, false, shardsArr[0]);

    // if we turn off fingerprinting, it should succeed
    System.setProperty("solr.index.replication.fingerprint.enabled", "false");
    try {
      assertSync(client1, numVersions, true, shardsArr[0]);
    } finally {
      System.clearProperty("solr.index.replication.fingerprint.enabled");
    }

    // let's add the missing document and verify that order doesn't matter
    add(client1, seenLeader, sdoc("id", Integer.toString(v), "_version_", v));
    assertSync(client1, numVersions, true, shardsArr[0]);

    // let's do some overwrites to ensure that repeated updates and maxDoc don't matter
    for (int i = 0; i < 10; i++) {
      // add individually instead of in batch to create more writes
      add(client0, seenLeader, sdoc("id", Integer.toString(v + i + 1), "_version_", v + i + 1));
    }
    assertSync(client1, numVersions, true, shardsArr[0]);

    validateDocs(docsAdded, client0, client1);

    // let's add some in-place updates
    add(
        client0,
        seenLeader,
        sdoc("id", "5000", "val_i_dvo", 0, "title", "mytitle", "_version_", 5000)); // full update
    docsAdded.add(5000);
    assertSync(client1, numVersions, true, shardsArr[0]);
    // verify the in-place updated document (id=5000) has correct fields
    assertEquals(0, client1.getById("5000").get("val_i_dvo"));
    assertEquals(
        client0.getById("5000") + " and " + client1.getById("5000"),
        "mytitle",
        client1.getById("5000").getFirstValue("title"));

    ModifiableSolrParams inPlaceParams = new ModifiableSolrParams(seenLeader);
    inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5000");
    add(
        client0,
        inPlaceParams,
        sdoc("id", "5000", "val_i_dvo", 1, "_version_", 5001)); // in-place update
    assertSync(client1, numVersions, true, shardsArr[0]);
    // verify the in-place updated document (id=5000) has correct fields
    assertEquals(1, client1.getById("5000").get("val_i_dvo"));
    assertEquals(
        client0.getById("5000") + " and " + client1.getById("5000"),
        "mytitle",
        client1.getById("5000").getFirstValue("title"));

    // interleave the in-place updates with a few deletes to other documents
    del(client0, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "5002"), 4001);
    delQ(client0, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "5003"), "id:4002");
    docsAdded.remove(4001);
    docsAdded.remove(4002);

    inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5001");
    add(
        client0,
        inPlaceParams,
        sdoc("id", 5000, "val_i_dvo", 2, "_version_", 5004)); // in-place update
    assertSync(client1, numVersions, true, shardsArr[0]);
    // verify the in-place updated document (id=5000) has correct fields
    assertEquals(2, client1.getById("5000").get("val_i_dvo"));
    assertEquals(
        client0.getById("5000") + " and " + client1.getById("5000"),
        "mytitle",
        client1.getById("5000").getFirstValue("title"));

    // a DBQ with value
    delQ(
        client0,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "5005"),
        "val_i_dvo:1"); // current val is 2, so this should not delete anything
    assertSync(client1, numVersions, true, shardsArr[0]);

    add(
        client0,
        seenLeader,
        sdoc("id", "5000", "val_i_dvo", 0, "title", "mytitle", "_version_", 5000)); // full update
    docsAdded.add(5000);
    assertSync(client1, numVersions, true, shardsArr[0]);
    inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "5004");
    add(client0, inPlaceParams, sdoc("id", 5000, "val_i_dvo", 3, "_version_", 5006));
    assertSync(client1, numVersions, true, shardsArr[0]);

    // verify the in-place updated document (id=5000) has correct fields
    assertEquals(3, client1.getById("5000").get("val_i_dvo"));
    assertEquals(
        client0.getById("5000") + " and " + client1.getById("5000"),
        "mytitle",
        client1.getById("5000").getFirstValue("title"));

    validateDocs(docsAdded, client0, client1);

    del(client0, params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "5007"), 5000);
    docsAdded.remove(5000);
    assertSync(client1, numVersions, true, shardsArr[0]);

    validateDocs(docsAdded, client0, client1);

    // if doc with id=6000 is deleted, further in-place-updates should fail
    add(
        client0,
        seenLeader,
        sdoc("id", "6000", "val_i_dvo", 6, "title", "mytitle", "_version_", 6000)); // full update
    delQ(
        client0,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "6004"),
        "val_i_dvo:6"); // current val is 6000, this will delete id=6000
    assertSync(client1, numVersions, true, shardsArr[0]);
    SolrException ex =
        expectThrows(
            SolrException.class,
            () -> {
              inPlaceParams.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, "6000");
              add(client0, inPlaceParams, sdoc("id", 6000, "val_i_dvo", 6003, "_version_", 5007));
            });
    assertEquals(ex.toString(), SolrException.ErrorCode.SERVER_ERROR.code, ex.code());
    assertThat(ex.getMessage(), containsString("Can't find document with id=6000"));

    // Reordered DBQ with Child-nodes (SOLR-10114)
    docsAdded.clear();

    // Reordered full delete should not delete child-docs
    add(client0, seenLeader, sdocWithChildren(7001, "7001", 2)); // add with later version
    docsAdded.add(7001);
    docsAdded.add(7001001);
    docsAdded.add(7001002);
    delQ(
        client0,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "7000"),
        "id:*"); // reordered delete
    assertSync(client1, numVersions, true, shardsArr[0]);
    validateDocs(docsAdded, client0, client1);

    // Reordered DBQ should not affect update
    add(client0, seenLeader, sdocWithChildren(8000, "8000", 5)); // add with later version
    delQ(
        client0,
        params(DISTRIB_UPDATE_PARAM, FROM_LEADER, "_version_", "8002"),
        "id:8500"); // not found, arrives earlier
    add(client0, seenLeader, sdocWithChildren(8000, "8001", 2)); // update with two childs
    docsAdded.add(8000);
    docsAdded.add(8000001);
    docsAdded.add(8000002);
    assertSync(client1, numVersions, true, shardsArr[0]);
    validateDocs(docsAdded, client0, client1);

    assertNotEquals(
        PeerSync.SHARD_REQUEST_PURPOSE_GET_UPDATES, PeerSync.SHARD_REQUEST_PURPOSE_GET_VERSIONS);

    handleVersionsWithRangesTests();
  }

  protected void testOverlap(Set<Integer> docsAdded, SolrClient client0, SolrClient client1, long v)
      throws IOException, SolrServerException {
    int toAdd = (int) (numVersions * .95);
    for (int i = 0; i < toAdd; i++) {
      add(client0, seenLeader, sdoc("id", Integer.toString(i + 11), "_version_", v + i + 1));
      docsAdded.add(i + 11);
    }

    // sync should fail since there's not enough overlap to give us confidence
    assertSync(client1, numVersions, false, shardsArr[0]);

    // add some docs that were missing... just enough to give enough overlap
    int toAdd2 = (int) (numVersions * .25);
    for (int i = 0; i < toAdd2; i++) {
      add(client1, seenLeader, sdoc("id", Integer.toString(i + 11), "_version_", v + i + 1));
    }

    assertSync(client1, numVersions, true, shardsArr[0]);
    validateDocs(docsAdded, client0, client1);
  }

  protected void validateDocs(Set<Integer> docsAdded, SolrClient client0, SolrClient client1)
      throws SolrServerException, IOException {
    client0.commit();
    client1.commit();
    QueryResponse qacResponse;
    qacResponse =
        queryAndCompare(
            params("q", "*:*", "rows", "10000", "sort", "_version_ desc,id desc"),
            client0,
            client1);
    validateQACResponse(docsAdded, qacResponse);
  }

  void assertSync(SolrClient client, int numVersions, boolean expectedResult, String... syncWith)
      throws IOException, SolrServerException {
    QueryRequest qr =
        new QueryRequest(
            params(
                "qt",
                "/get",
                "getVersions",
                Integer.toString(numVersions),
                "sync",
                StrUtils.join(Arrays.asList(syncWith), ',')));
    NamedList<?> rsp = client.request(qr);
    assertEquals(expectedResult, (Boolean) rsp.get("sync"));
  }

  void validateQACResponse(Set<Integer> docsAdded, QueryResponse qacResponse) {
    Set<Integer> qacDocs = new LinkedHashSet<>();
    for (int i = 0; i < qacResponse.getResults().size(); i++) {
      qacDocs.add(Integer.parseInt(qacResponse.getResults().get(i).getFieldValue("id").toString()));
    }
    assertEquals(docsAdded, qacDocs);
    assertEquals(docsAdded.size(), qacResponse.getResults().getNumFound());
  }

  private static void handleVersionsWithRangesTests() {
    testHandleVersionsWithRangesNoOther();
    testHandleVersionsWithRangesSameOne();
    testHandleVersionsWithRangesMissingOneOfTwo(false /* highestMissing */);
    testHandleVersionsWithRangesMissingOneOfTwo(true /* highestMissing */);
    testHandleVersionsWithRangesMissingMiddleOfThree();
    testHandleVersionsWithRangesMissingOneRange(false /* duplicateMiddle */);
    testHandleVersionsWithRangesMissingOneRange(true /* duplicateMiddle */);
    testHandleVersionsWithRangesMissingTwoRanges();
  }

  private static void testHandleVersionsWithRangesNoOther() {
    // no other, solitary us
    for (boolean completeList : new boolean[] {false, true}) {
      List<Long> otherVersions = Collections.emptyList();
      List<Long> ourUpdates = Collections.singletonList(42L);
      assertEquals(1, ourUpdates.size());
      long ourLowThreshold = ourUpdates.get(0);
      MissedUpdatesRequest mur =
          PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
              otherVersions, completeList, ourUpdates, ourLowThreshold);
      // no updates requested since other has nothing
      assertEquals(0L, mur.totalRequestedUpdates);
      assertNull(mur.versionsAndRanges);
    }
  }

  private static void testHandleVersionsWithRangesSameOne() {
    for (boolean completeList : new boolean[] {false, true}) {
      List<Long> otherVersions = Collections.singletonList(42L);
      List<Long> ourUpdates = Collections.singletonList(42L);
      assertEquals(1, ourUpdates.size());
      long ourLowThreshold = ourUpdates.get(0);
      MissedUpdatesRequest mur =
          PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
              otherVersions, completeList, ourUpdates, ourLowThreshold);
      // no updates requested since us and other have the same versions
      assertEquals(0L, mur.totalRequestedUpdates);
      assertNull(mur.versionsAndRanges);
    }
  }

  private static void testHandleVersionsWithRangesMissingOneOfTwo(boolean highestMissing) {
    for (boolean completeList : new boolean[] {false, true}) {
      LinkedList<Long> otherVersions = new LinkedList<>(List.of(44L, 22L));
      LinkedList<Long> ourUpdates = new LinkedList<>(otherVersions);
      final Long missing;
      if (highestMissing) {
        missing = ourUpdates.removeFirst();
      } else {
        missing = ourUpdates.removeLast();
      }
      assertEquals(1, ourUpdates.size());
      long ourLowThreshold = ourUpdates.get(0);
      MissedUpdatesRequest mur =
          PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
              otherVersions, completeList, ourUpdates, ourLowThreshold);
      if (highestMissing || completeList) {
        /*
         * request one update for the missing one, because
         * the missing one is the highest (i.e. latest) or because
         * it's not the highest/latest, but we need a complete list
         */
        assertEquals(1L, mur.totalRequestedUpdates);
        assertEquals(missing + "..." + missing, mur.versionsAndRanges);
      } else {
        /*
         * request no updates because we already have the highest/latest, and
         * we don't need a complete list i.e. missing earlier-than-latest is okay
         */
        assertTrue(missing < ourLowThreshold);
        assertEquals(0L, mur.totalRequestedUpdates);
        assertNull(mur.versionsAndRanges);
      }
    }
  }

  private static void testHandleVersionsWithRangesMissingMiddleOfThree() {
    for (boolean completeList : new boolean[] {false, true}) {
      List<Long> otherVersions = List.of(55L, 33L, 11L);
      LinkedList<Long> ourUpdates = new LinkedList<>(otherVersions);
      Long missing = ourUpdates.remove(ourUpdates.size() / 2);
      assertEquals(33L, missing.longValue());
      {
        long ourLowThreshold = ourUpdates.getLast(); // lowest in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        // request the one update we are missing
        assertEquals(1L, mur.totalRequestedUpdates);
        assertEquals(missing + "..." + missing, mur.versionsAndRanges);
      }
      {
        long ourLowThreshold = ourUpdates.getFirst(); // highest in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        if (completeList) {
          // request the one update we are missing
          assertEquals(1L, mur.totalRequestedUpdates);
          assertEquals(missing + "..." + missing, mur.versionsAndRanges);
        } else {
          /*
           * request no updates because we don't need a complete list and
           * the one we are missing is below our chosen 'low' threshold
           */
          assertTrue(missing < ourLowThreshold);
          assertEquals(0L, mur.totalRequestedUpdates);
          assertNull(mur.versionsAndRanges);
        }
      }
    }
  }

  private static void testHandleVersionsWithRangesMissingOneRange(boolean duplicateMiddle) {
    for (boolean completeList : new boolean[] {false, true}) {
      List<Long> otherVersions =
          duplicateMiddle
              ? List.of(9L, 8L, 7L, 6L, 5L, 5L, 4L, 3L, 2L, 1L)
              : List.of(9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L);
      LinkedList<Long> ourUpdates = new LinkedList<>(List.of(9L, 8L, 7L, 3L, 2L, 1L));
      long expectedTotalRequestedUpdates = duplicateMiddle ? 4L : 3L;
      {
        long ourLowThreshold = ourUpdates.getLast(); // lowest in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        // request all we are missing
        assertEquals(expectedTotalRequestedUpdates, mur.totalRequestedUpdates);
        assertEquals("4...6", mur.versionsAndRanges);
      }
      {
        long ourLowThreshold = 3; // min of gap minus 1
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        if (completeList) {
          // request all we are missing since we want a complete list
          assertEquals(expectedTotalRequestedUpdates, mur.totalRequestedUpdates);
          assertEquals("4...6", mur.versionsAndRanges);
        } else {
          // request no updates because ???
          assertEquals(0L, mur.totalRequestedUpdates);
          assertNull(mur.versionsAndRanges);
        }
      }
      {
        long ourLowThreshold = 7; // max of gap plus 1
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        if (completeList) {
          // request all we are missing since we want a complete list
          assertEquals(expectedTotalRequestedUpdates, mur.totalRequestedUpdates);
          assertEquals("4...6", mur.versionsAndRanges);
        } else {
          // request no updates because ???
          assertEquals(0L, mur.totalRequestedUpdates);
          assertNull(mur.versionsAndRanges);
        }
      }
      {
        long ourLowThreshold = ourUpdates.getFirst(); // highest in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        if (completeList) {
          // request all we are missing since we want a complete list
          assertEquals(expectedTotalRequestedUpdates, mur.totalRequestedUpdates);
          assertEquals("4...6", mur.versionsAndRanges);
        } else {
          // request no updates since we don't need a complete list ...
          assertEquals(0L, mur.totalRequestedUpdates);
          assertNull(mur.versionsAndRanges);
          // ... and all the missing versions are older/lower than our 'low' threshold
          for (Long version : otherVersions) {
            if (!ourUpdates.contains(version)) {
              assertTrue(version < ourLowThreshold);
            }
          }
        }
      }
    }
  }

  private static void testHandleVersionsWithRangesMissingTwoRanges() {
    for (boolean completeList : new boolean[] {false, true}) {
      LinkedList<Long> otherVersions =
          new LinkedList<>(List.of(9L, 8L, 7L, 6L, 5L, 4L, 3L, 2L, 1L));
      LinkedList<Long> ourUpdates =
          new LinkedList<>(
              List.of(
                  otherVersions.getFirst(),
                  otherVersions.get(otherVersions.size() / 2),
                  otherVersions.getLast()));
      {
        long ourLowThreshold = ourUpdates.getLast(); // lowest in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        // request all we are missing
        assertEquals(6L, mur.totalRequestedUpdates);
        assertEquals("2...4,6...8", mur.versionsAndRanges);
      }
      {
        long ourLowThreshold = ourUpdates.get(ourUpdates.size() / 2); // middle in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        if (completeList) {
          // request all we are missing
          assertEquals(6L, mur.totalRequestedUpdates);
          assertEquals("2...4,6...8", mur.versionsAndRanges);
        } else {
          // request no updates because ???
          assertEquals(0L, mur.totalRequestedUpdates);
          assertNull(mur.versionsAndRanges);
        }
      }
      {
        long ourLowThreshold = ourUpdates.getFirst(); // highest in descending list
        MissedUpdatesRequest mur =
            PeerSync.MissedUpdatesFinderBase.handleVersionsWithRanges(
                otherVersions, completeList, ourUpdates, ourLowThreshold);
        if (completeList) {
          // request all we are missing since we want a complete list
          assertEquals(6L, mur.totalRequestedUpdates);
          assertEquals("2...4,6...8", mur.versionsAndRanges);
        } else {
          // request no updates since we don't need a complete list ...
          assertEquals(0L, mur.totalRequestedUpdates);
          assertNull(mur.versionsAndRanges);
          // ... and all the missing versions are older/lower than our 'low' threshold
          for (Long version : otherVersions) {
            if (!ourUpdates.contains(version)) {
              assertTrue(version < ourLowThreshold);
            }
          }
        }
      }
    }
  }
}
